// Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

#include "flush_manager.h"

#include <algorithm>
#include <utility>

#include <table/block_based_table_factory.h>
#include "db/write_batch_internal.h"
namespace rocksdb {

const InternalKeyComparator
    FlushManager::FlushNode::internal_key_comparator_ = InternalKeyComparator(BytewiseComparator());
std::unordered_map<uint32_t, int64_t> FlushManager::flush_times_;
FlushManager::FlushNode::FlushNode(RangeArena *ra, Env *env, std::string &flush_dir,
                                   const ImmutableDBOptions &db_options,
                                   const EnvOptions &opt_env_options)
    : env_(env),
      db_options_(db_options),
      opt_env_options_(opt_env_options),
      range_arena_(ra),
      flush_dir_(flush_dir),
      kv_flush_last_size_(0),
      node_data_size_(0),
      last_flush_time_(0),
      flush_data_map_(sliceLessCompare) {}

FlushManager::FlushNode::~FlushNode() {}

Status FlushManager::FlushNode::Append() {
  Status s;
  auto map_iter = flush_data_map_.cbegin();
  while (map_iter != flush_data_map_.cend()) {
    // put/merge operation
    if (!map_iter->second.empty()) {
      s = sst_file_writer_->Put(map_iter->first, map_iter->second);
      assert(s.ok());
      node_data_size_ += map_iter->first.size() + map_iter->second.size();
      if (!s.ok())
        std::cout << "sst_file_writer_->Put ERROR" << std::endl;
    } else {
      // delete/single_delete operation
      s = sst_file_writer_->Delete(map_iter->first);
      node_data_size_ += map_iter->first.size();
      if (!s.ok())
        std::cout << "sst_file_writer_->Delete ERROR" << std::endl;
    }
    if (!s.ok()) {
      std::cout << "flushRecord ERROR" << std::endl;
      return Status::Corruption("flush append failed");
    }
    ++map_iter;
  }
  flush_data_map_.clear();
//  std::cout << node_data_size_ << std::endl;
  int64_t cur_time = 0;
  s = env_->GetCurrentTime(&cur_time);
  if (!s.ok()) {
    std::cout << "GetCurrentTime ERROR" << std::endl;
  }
  last_flush_time_ = cur_time;

  return s;
}

bool FlushManager::FlushNode::NeedFlush() {
  return range_arena_->IsNeedFlush(kv_flush_last_size_);
}

bool FlushManager::FlushNode::GetFlushData(const char *&entry, Status &s) {
  s = range_arena_->GetNeedFlushData(entry, kv_flush_last_size_);
  return entry != nullptr;
}

bool FlushManager::FlushNode::RangeArenaIsHandling() const {
  return range_arena_->IsHandling();
}

bool FlushManager::FlushNode::NeedDelete() const {
  return range_arena_->IsNeedDelete();
}

Status FlushManager::FlushNode::CreateFile(uint64_t flush_number) {
  Options options;
  options.table_factory = std::make_shared<BlockBasedTableFactory>();
  sst_file_writer_ = std::make_shared<SstFileWriter>
      (EnvOptions(), options, &internal_key_comparator_);
  std::string file_name = MakeTableFileName(flush_dir_, flush_number);
  Status s = sst_file_writer_->Open(file_name);
  if (!s.ok()) {
    std::cout << "ERROR... FlushManager::FlushQueue::addFlushNode"
                 " NewWritableFile failed." << std::endl;
    exit(1);
  }
  file_vector_.push_back(file_name);
  return s;
}

Status FlushManager::FlushNode::CloseFile() {
  Status s = sst_file_writer_->Finish();
  if (!s.ok()) {
    std::cout << "ERROR... FlushManager::FlushQueue::addFlushNode"
                 " NewWritableFile finish failed." << std::endl;
    exit(1);
  }
  return s;
}

Status FlushManager::FlushNode::DeleteFile() {
  // Delete the corresponding disk file
  for (const auto &file : file_vector_) {
    Status s = env_->DeleteFile(file);
    if (!s.ok()) {
      std::cout << "ERROR... FlushManager::FlushNode::DeleteFile"
                   " failed." << std::endl;
    }
  }
  file_vector_.clear();
  // Free memory block
  delete range_arena_;
  return Status::OK();
}

bool FlushManager::FlushNode::sliceLessCompare(const Slice &left_key, const Slice &right_key) {
  return internal_key_comparator_.Compare(left_key, right_key) < 0;
}

void FlushManager::FlushNode::AddToFlushMap(const char *entry) {
  uint32_t key_length;

  // entry format is:
  //    klength  varint32
  //    userkey  char[klength-8]
  //    tag      uint64
  //    vlength  varint32
  //    value    char[vlength]
  const char *key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
  const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
  Status s;
  ValueType type;
  SequenceNumber seq;
  UnPackSequenceAndType(tag, &seq, &type);

  switch (type) {
    case kTypeValue:
    case kTypeMerge: {
      uint32_t value_length;
      const char *value_ptr = GetVarint32Ptr
          (key_ptr + key_length, key_ptr + key_length + 5, &value_length);
      flush_data_map_[Slice(key_ptr, key_length)] = Slice(value_ptr, value_length);
    }
      break;
    case kTypeSingleDeletion:
    case kTypeDeletion: {
      flush_data_map_[Slice(key_ptr, key_length)] = Slice("");
      break;
      default:std::cout << "FlushManager::FlushNode::addToFlushMap has WRONG TYPE" << std::endl;
      break;
    }
  }
}

void FlushManager::exitFlushBGThread() { flush_bg_thread_ = false; }

void FlushManager::addFlushNode(RangeArena *ra) {
  auto node = new FlushNode(ra, env_, flush_dir_,
                            immutable_db_options_, opt_env_options_);
  flush_queue_.push_back(node);
}

FlushManager::FlushManager(uint32_t column_family_id,
                           const ImmutableCFOptions &db_options,
                           const std::string& flush_dir_path,
                           MultiRangeArena *multi_range_arenas)
    : column_family_id_(column_family_id),
      env_(db_options.env),
      immutable_db_options_(ImmutableDBOptions()),
      env_options_(BuildDBOptions(immutable_db_options_,
                                  MutableDBOptions())),
      opt_env_options_(env_->OptimizeForLogWrite(env_options_,
                                                 BuildDBOptions(immutable_db_options_, MutableDBOptions()))),

      flush_dir_(flush_dir_path),
      multi_range_arenas_(multi_range_arenas),
      flush_queue_(FlushQueue()),
      flush_number_(1) {
  flush_times_[column_family_id] = 0;
  bg_thread_ = std::thread(&FlushManager::Flush, this);
}

FlushManager::~FlushManager() {
  flush_sleep_notified_ = true;
  exitFlushBGThread();
  flush_sleep_cv_.notify_all();
  bg_thread_.join();
  for (const auto &node : flush_queue_) {
    delete node;
  }
}

bool FlushManager::parseLogFileName(const std::string &filename,
                                 uint64_t *number) {
  auto cr_point = find(filename.crbegin(), filename.crend(), '.');
  if (std::string(cr_point.base(), filename.cend()) == "log") {
    auto cr_num = find(cr_point, filename.crend(), '/');
    *number = stoul(std::string(cr_num.base(), cr_point.base() + 2));
    return true;
  }
  return false;
}

Status FlushManager::DeleteWAL(FlushNode* node){
  Status s;
  // 落盘节点每循环一圈，执行一次删除WAL文件处理
  flush_times_[column_family_id_] = node->GetLastFlushTime();
  // 找到列族中落盘时间最早的时刻
  auto map_iter = flush_times_.cbegin();
  int64_t min_flush_time = map_iter->second;
  while(map_iter != flush_times_.cend()){
    if(map_iter->second < min_flush_time) {
      min_flush_time = map_iter->second;
    }
    ++map_iter;
  }
  // 获取WAL文件
  std::vector<std::string> all_files;
  std::vector<uint64_t> file_numbers;
  s = env_->GetChildren(flush_dir_, &all_files);
  for (const auto& f : all_files) {
    uint64_t number;
    if (parseLogFileName(f, &number)) {
      file_numbers.emplace_back(number);
    }
  }

  std::sort(file_numbers.begin(),file_numbers.end());
  // 根据时间删除预写日志文件
  int size = file_numbers.size();
  for(int i = 0; i < size - 1; ++i){
    uint64_t log_time = file_numbers[i] / 100;
    uint64_t flush_time = min_flush_time;
//    std::cout << min_flush_time << std::endl;
    if(log_time < flush_time){
      std::string file_number_str = ToString(file_numbers[i]);
      if(file_number_str.size() < 6){
        file_number_str = std::string(6 - file_number_str.size(),'0') + file_number_str;
      }
      std::string file_name = flush_dir_ + "/" + file_number_str + ".log";
//      std::cout << file_name << std::endl;
      s = env_->DeleteFile(file_name);
    }
  }
  return s;
}

Status FlushManager::Flush() {
  Status s;

  while (flush_bg_thread_) {
    // Add the range arena to the flush queue (this process is thread safe)
    RangeArena *ra;
    if (multi_range_arenas_->RangeArenaQue().TryPop(ra)) {
      addFlushNode(ra);
      continue;
    }
    // If there is no disk drop queue and no node, jump out of this cycle.
    if (flush_queue_.empty()) {
      // sleep 100ms
      std::this_thread::sleep_for(std::chrono::milliseconds(100));
      continue;
    }
    // In the memory block processing state, wait for the node processing to
    // complete
    FlushNode *node = flush_queue_.back();
    flush_queue_.pop_back();

    int64_t cur_time = 0;
    env_->GetCurrentTime(&cur_time);

    std::unique_lock<std::mutex> lock(flush_sleep_mutex_);
    int64_t diff_value = cur_time - node->GetLastFlushTime();
    // 落盘节点循环一圈后，需要沉睡。
    while (!flush_sleep_notified_ && diff_value < kMemFlushMaxInterval) {
//      std::cout << "DeleteWAL " << node->GetLastFlushTime() << std::endl;
      s = DeleteWAL(node);
      assert(s.ok());
      flush_sleep_cv_.wait_for(lock, std::chrono::seconds(kMemFlushMaxInterval - diff_value + 1));
      s = env_->GetCurrentTime(&cur_time);
      assert(s.ok());
      diff_value = cur_time - node->GetLastFlushTime();
    }
    flush_sleep_notified_ = false;
    lock.unlock();

    if (node->RangeArenaIsHandling()) {
      // wait node ok . or deleted. then flush new mem block.
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
      flush_queue_.push_back(node);
      continue;
    }

    // Does the memory block need to be deleted
    if (node->NeedDelete()) {
      s = node->DeleteFile();
      assert(s.ok());
      delete node;
      continue;
    }
    // Node flush
    if (node->NeedFlush()) {
      uint64_t flush_number = flush_number_.fetch_add(1, std::memory_order_relaxed);
      s = node->CreateFile(flush_number);
      assert(s.ok());
      const char *entry;
      while (node->GetFlushData(entry, s) && s.ok()) {
        // 为 map 添加数据, 并利用 map 对数据进行排序
        node->AddToFlushMap(entry);
        assert(s.ok());
      }
      // 从map中获取有序的数据
      s = node->Append();
      assert(s.ok());
      s = node->CloseFile();
      assert(s.ok());
    }
    flush_queue_.push_front(node);
  }
  return Status::OK();
}
}  // namespace rocksdb
